【祝GA】 Lambda ExtensionsでLambdaのログをGCPのCloud Loggingに送信してみた
CX事業本部@大阪の岩田です。先日Lambda ExtensionsがGAされました(東京リージョンはまだですが)。Lambda ExtensionsはLambdaのモニタリング、可観測性、セキュリティ、ガバナンスのための運用ツールをLambdaに統合する機能で、Lambda実行環境のライフサイクルと連動して外部のサードパーティ製品に直接ログやメトリクスを送信するといったことが可能です。
パブリックプレビュー自体は半年以上前から利用できる状態だったのですが、これまでちゃんと触れていなかったので自分の理解を深めるために簡単なExtensionを実装してみようと思い立ちました。何を作るか考えたのですが、今回はLambdaのログをGCPのCloud Loggingに送信するExtensionを作ることにしました。なおLambdaのランタイムにはPython3.8を利用しています。
Lambda ExtensionsのおさらいとLogs API
以下のブログでも紹介されていますが、Extensionの実行形態は大きく内部モードと外部モードの2種類に分かれます。
外部モードはLambda実行環境で独立したプロセスとして実行され、関数呼び出しが完全に処理された後も引き続き実行されるという特徴を持ち、Lambdaのログや各種メトリクスをサードパーティに連携するのに適したモデルです。Lambda実行環境のライフサイクルINIT
フェーズにExtensions API経由で自作のコードをExtensionとして登録することで、INVOKE
フェーズ、SHUTDOWN
フェーズに自作のロジックをLambda関数本体と並行して実行できるようになります。
※画像は公式ドキュメントより引用
Logs API
前述のようにExtensionのコードはLambda関数本体と並行して実行可能なのですが、Lambda Extensionのユースケースの1つであるサードパーティへのログの連携はどのように実現すれば良いのでしょうか?ここで利用できるのがLambdaランタイムAPIの「Logs API」です。Lambdaのサービス基盤はランタイムのログを自動的にキャプチャしてCloudWatch Logsにストリームしているのですが、「Logs API」を利用するとLambda Extensionsからログストリームに直接サブスクライブできます。
Logs APIの仕様は以下の通りです
- パス: http://${AWS_LAMBDA_RUNTIME_API}/2020-08-15/logs/
- メソッド: PUT
- リクエストパラメータ:
- destination...
- protocol... Lambdaの実行基盤がExtensionに対してログを配信するためのプロトコルを指定します。
HTTP
もしくはTCP
が指定可能でHTTP
の利用が推奨されています。 - URI... Extensionがログの配信を待ち受けるエンドポイントのURIを指定します。Extensionの開発者はこのURIでLambdaランタイムからのリクエストを待ち受けるようにExtensionを実装する必要があります。
- protocol... Lambdaの実行基盤がExtensionに対してログを配信するためのプロトコルを指定します。
- buffering...Lambdaの実行基盤はログをバッファリングしてサブスクライバ(Extension)に配信します。このパラメータはバッファリング関連の挙動を調整するために利用します。
- maxItems... ログのバッチをバッファーする最大時間(ミリ秒単位)。デフォルト:1000 最小: 25 最大:30000
- maxBytes... メモリにバッファするログの最大サイズ (バイト単位)。デフォルト:262,144 最小:262,144 最大:1,048,576
- timeoutMs... メモリにバッファするイベントの最大数。デフォルト:10,000 最小:1,000 最大:10,000
- types...Extensionがサブスクライブするログの種別を配列形式で指定します。指定可能なログ種別は
platform
,function
,extension
の3種です
- destination...
リクエストのサンプルは以下のような形式になります
PUT http://${AWS_LAMBDA_RUNTIME_API}/2020-08-15/logs/ HTTP/1.1 { "schemaVersion": "2020-08-15", "types": [ "platform", "function" ], "buffering": { "maxItems": 1000, "maxBytes": 10240, "timeoutMs": 100 } "destination": { "protocol": "HTTP", "URI": "http://sandbox.localdomain:8080/lambda_logs" } }
この例ではhttp://sandbox.localdomain:8080/lambda_logs
というエンドポイントでプラットフォームのログと関数のログをサブスクライブしています。サブスクライブが完了すると、以後Lambda実行基盤のライフサイクルに合わせてExtensionのエンドポイントに対してJSON形式のログがPOSTされるようになります。例えばLambda Functionのコードからprint('some message')
というコードでログを出力すると、以下のようなJSONデータがExtensionの待ち受けエンドポイントに自動的にPOSTされてきます。
{ type: "function" record: "some message" time: "2021-05-31T11:59:09.154Z" }
あとはExtension側でこのPOSTされてきたデータをサードパーティによしなに連携するだけです。
Extensionを作ってGCPにログを送ってみる
実際にExtensionを構築してLambdaからGCPにログを送ってみましょう
GCP側の事前作業
まずCloud Loggingにログを書き込むためのサービスアカウントを作成します
$ gcloud iam service-accounts create <サービスアカウント名>
作成したサービスアカウントにlogging.logWriter
のロールをアタッチします。これでサービスアカウントからCloud Loggingにログを書き込めるようになります。
$ gcloud projects add-iam-policy-binding <プロジェクトID> --member=serviceAccount:<サービスアカウント名>@<プロジェクトID>.iam.gserviceaccount.com --role=roles/logging.logWriter
Lambdaからサービスアカウントを利用するためにサービスアカウントキーを発行します
$ gcloud iam service-accounts keys create <適当なファイル名> --iam-account <サービスアカウント名>@<プロジェクトID>.iam.gserviceaccount.com
指定したファイル名でサービスアカウントキーのJSONファイルが作成されるので中身を確認してみましょう
$cat <適当なファイル名> { "type": "service_account", "project_id": "<プロジェクトID>", "private_key_id": "c9818...略", "private_key": "-----BEGIN PRIVATE KEY-----\n...略==\n-----END PRIVATE KEY-----\n", "client_email": "<サービスアカウント名>@<プロジェクトID>.iam.gserviceaccount.com", "client_id": "123456789012345678901", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/<サービスアカウント名>%40<プロジェクトID>.iam.gserviceaccount.com" }
こんな感じのJSONが表示されればOKです
AWS側の事前作業
先程作成したGCPのサービスアカウントキーをSecret Managerに登録しておきましょう。
$aws secretsmanager create-secret --name <シークレット名> --secret-string file://<サービスアカウントキーのJSONファイル名>
後ほどExtensionの初期処理でSecret Managerからサービスアカウントキーを取得して利用します
Extensionの実装
ここから実際にExtensionを実装していきます。以下のGitHubリポジトリでLambda Extensionsの各種サンプルが公開されているのですが、この中の
s3-logs-extension-demo-zip-archive
というディレクトリにExtensionからS3バケットに直接ログを送信するサンプルが含まれています。今回はこのサンプルをベースに微修正して実装します。
https://github.com/aws-samples/aws-lambda-extensions
まずextensionssrc/requirements.txt
にCloud Loggingのクライアントライブラリを追加します
boto3 google-cloud-logging
これで後ほどsam build
した時にCloud Loggingのクライアントライブラリがデプロイパッケージに組み込まれるようになりました。続いてextensions/logs_api_http_extension.py
のコードを修正します。まず先頭のライブラリ部分に必要なライブラリのimportを追加します。
import json from google.cloud import logging from google.oauth2 import service_account
続いてログ送信の本体部分run_forever
の修正です。まずは初期化処理部分にSecret ManagerからGCPのサービスアカウントキーを取得してクライアントライブラリを初期化する処理を追加します。
def run_forever(self): client = boto3.client('secretsmanager') res = client.get_secret_value(SecretId=os.environ['GCP_SECRET_NAME']) json_acct_info = json.loads(res['SecretString']) credentials = service_account.Credentials.from_service_account_info(json_acct_info) scoped_credentials = credentials.with_scopes( ['https://www.googleapis.com/auth/cloud-platform']) logging_client = logging.Client(credentials=scoped_credentials) log_name = "awslambda" logger = logging_client.logger(log_name) print(f"extension.logs_api_http_extension: Receiving Logs {self.agent_name}")
続いて実際のログ送信部分です。LambdaのLogs APIから取得したログメッセージをCloud Loggingのクライアントのlog_struct
を使って送信します。
while True: resp = self.extensions_api_client.next(self.agent_id) # Process the received batches if any. while not self.queue.empty(): batch = self.queue.get_nowait() # ...略 try: for item in range(len(batch)): response = logger.log_struct(batch[item]) except Exception as e: raise Exception(f"Error sending log to S3 {e}") from e
extensions/logs_api_http_extension.py
の全体です。
#!/bin/sh # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 ''''exec python -u -- "$0" ${1+"$@"} # ''' import os import sys from pathlib import Path from datetime import datetime import json # Add lib folder to path to import boto3 library. # Normally with Lambda Layers, python libraries are put into the /python folder which is in the path. # As this extension is bringing its own Python runtime, and running a separate process, the path is not available. # Hence, having the files in a different folder and adding it to the path, makes it available. lib_folder = Path(__file__).parent / "lib" sys.path.insert(0,str(lib_folder)) import boto3 from google.cloud import logging from google.oauth2 import service_account from logs_api_http_extension.http_listener import http_server_init, RECEIVER_PORT from logs_api_http_extension.logs_api_client import LogsAPIClient from logs_api_http_extension.extensions_api_client import ExtensionsAPIClient from queue import Queue """Here is the sample extension code. - The extension runs two threads. The "main" thread, will register with the Extension API and process its invoke and shutdown events (see next call). The second "listener" thread listens for HTTP POST events that deliver log batches. - The "listener" thread places every log batch it receives in a synchronized queue; during each execution slice, the "main" thread will make sure to process any event in the queue before returning control by invoking next again. - Note that because of the asynchronous nature of the system, it is possible that logs for one invoke are processed during the next invoke slice. Likewise, it is possible that logs for the last invoke are processed during the SHUTDOWN event. Note: 1. This is a simple example extension to help you understand the Lambda Logs API. This code is not production ready. Use it with your own discretion after testing it thoroughly. 2. The extension code starts with a shebang. This is to bring Python runtime to the execution environment. This works if the lambda function is a python3.x function, therefore it brings the python3.x runtime with itself. It may not work for python 2.7 or other runtimes. The recommended best practice is to compile your extension into an executable binary and not rely on the runtime. 3. This file needs to be executable, so make sure you add execute permission to the file `chmod +x logs_api_http_extension.py` """ class LogsAPIHTTPExtension(): def __init__(self, agent_name, registration_body, subscription_body): # print(f"extension.logs_api_http_extension: Initializing LogsAPIExternalExtension {agent_name}") self.agent_name = agent_name self.queue = Queue() self.logs_api_client = LogsAPIClient() self.extensions_api_client = ExtensionsAPIClient() # Register early so Runtime could start in parallel self.agent_id = self.extensions_api_client.register(self.agent_name, registration_body) # Start listening before Logs API registration # print(f"extension.logs_api_http_extension: Starting HTTP Server {agent_name}") http_server_init(self.queue) self.logs_api_client.subscribe(self.agent_id, subscription_body) def run_forever(self): client = boto3.client('secretsmanager') res = client.get_secret_value(SecretId=os.environ['GCP_SECRET_NAME']) json_acct_info = json.loads(res['SecretString']) credentials = service_account.Credentials.from_service_account_info(json_acct_info) scoped_credentials = credentials.with_scopes( ['https://www.googleapis.com/auth/cloud-platform']) logging_client = logging.Client(credentials=scoped_credentials) log_name = "awslambda" logger = logging_client.logger(log_name) print(f"extension.logs_api_http_extension: Receiving Logs {self.agent_name}") while True: resp = self.extensions_api_client.next(self.agent_id) # Process the received batches if any. while not self.queue.empty(): batch = self.queue.get_nowait() # This following line logs the events received to CloudWatch. # Replace it to send logs to elsewhere. # If you've subscribed to extension logs, e.g. "types": ["platform", "function", "extension"], # you'll receive the logs of this extension back from Logs API. # And if you log it again with the line below, it will create a cycle since you receive it back again. # Use `extension` log type if you'll egress it to another endpoint, # or make sure you've implemented a protocol to handle this case. # print(f"Log Batch Received from Lambda: {batch}", flush=True) # There are two options illustrated: # 1. Sending the entire log batch to S3 # 2. Parsing the batch and sending individual log lines. # This could be used to parse the log lines and only selectively send logs to S3, or amend for any other destination. # 1. The following line writes the entire batch to S3 try: for item in range(len(batch)): response = logger.log_struct(batch[item]) except Exception as e: raise Exception(f"Error sending log to GCP {e}") from e # Register for the INVOKE events from the EXTENSIONS API _REGISTRATION_BODY = { "events": ["INVOKE", "SHUTDOWN"], } # Subscribe to platform logs and receive them on ${local_ip}:4243 via HTTP protocol. TIMEOUT_MS = 1000 # Maximum time (in milliseconds) that a batch is buffered. MAX_BYTES = 262144 # Maximum size in bytes that the logs are buffered in memory. MAX_ITEMS = 10000 # Maximum number of events that are buffered in memory. _SUBSCRIPTION_BODY = { "destination":{ "protocol": "HTTP", "URI": f"http://sandbox:{RECEIVER_PORT}", }, "types": ["platform", "function"], "buffering": { "timeoutMs": TIMEOUT_MS, "maxBytes": MAX_BYTES, "maxItems": MAX_ITEMS } } def main(): # print(f"extension.logs_api_http_extension: Starting Extension {_REGISTRATION_BODY} {_SUBSCRIPTION_BODY}") # Note: Agent name has to be file name to register as an external extension ext = LogsAPIHTTPExtension(os.path.basename(__file__), _REGISTRATION_BODY, _SUBSCRIPTION_BODY) ext.run_forever() if __name__ == "__main__": main()
SAMテンプレートを以下のように修正します。修正点は以下の通りです
- S3関連の記述を削除
- Secret Managerのシークレット名を受け取るためのパラメータを追加
- Secret Manager用の権限を追加
AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Lambda Extensions S3 Logs Demo ########################################################################## # Parameters & Globals # ########################################################################## Globals: Function: Tracing: Active Tags: Application: S3LogsExtensionDemo Parameters: SecretName: Type: String Resources: ########################################################################## # Lambda functions # ########################################################################## Function: Type: AWS::Serverless::Function Properties: FunctionName: logs-extension-demo-function Description: Logs Extension Demo Function CodeUri: functionsrc/ Runtime: python3.8 Handler: lambda_function.lambda_handler MemorySize: 128 Timeout: 100 Environment: Variables: GCP_SECRET_NAME: Ref: SecretName Layers: - !Ref S3LogExtensionsLayer Policies: - AWSSecretsManagerGetSecretValuePolicy: SecretArn: !Sub arn:aws:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretName}* ########################################################################## # Lambda layers # ########################################################################## S3LogExtensionsLayer: Type: AWS::Serverless::LayerVersion Properties: Description: Layer containing extension(s) ContentUri: extensionssrc/ CompatibleRuntimes: - python3.8 LicenseInfo: 'Available under the MIT-0 license.' RetentionPolicy: Delete Metadata: BuildMethod: makefile ########################################################################## # OUTPUTS # ########################################################################## Outputs: ExtensionsLayer: Value: !Ref S3LogExtensionsLayer Description: Log Extension Layer ARN Function: Value: !Ref Function Description: Lambda Function
Diffはこんな感じです
--- a/s3-logs-extension-demo-zip-archive/template.yml +++ b/s3-logs-extension-demo-zip-archive/template.yml @@ -9,6 +9,9 @@ Globals: Tracing: Active Tags: Application: S3LogsExtensionDemo +Parameters: + SecretName: + Type: String Resources: ########################################################################## # Lambda functions # @@ -25,13 +28,14 @@ Resources: Timeout: 100 Environment: Variables: - S3_BUCKET_NAME: - Ref: LogExtensionsBucket + GCP_SECRET_NAME: + Ref: SecretName Layers: - !Ref S3LogExtensionsLayer Policies: - - S3WritePolicy: - BucketName: !Ref LogExtensionsBucket + - AWSSecretsManagerGetSecretValuePolicy: + SecretArn: !Sub arn:aws:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretName}* + ########################################################################## # Lambda layers # ########################################################################## @@ -47,17 +51,6 @@ Resources: Metadata: BuildMethod: makefile ########################################################################## -# S3 Resources # -########################################################################## - LogExtensionsBucket: - Type: 'AWS::S3::Bucket' - Properties: - LifecycleConfiguration: - Rules: - - Id: DeleteAfterSevenDays - Status: "Enabled" - ExpirationInDays: 7 -########################################################################## # OUTPUTS # ########################################################################## Outputs: @@ -67,5 +60,3 @@ Outputs: Function: Value: !Ref Function Description: Lambda Function - LogExtensionsBucketName: - Value: !Ref LogExtensionsBucket
あまり本質的な部分ではないですが、せっかくなのでLambda Function本体が出力するログメッセージも修正しておきましょう
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 import json import os def lambda_handler(event, context): print(f"Function: Logging something which logging extension will send to GCP Cloud Logging") return { 'statusCode': 200, 'body': json.dumps('Hello from Lambda!') }
デプロイ&テスト
一通り準備ができたのでデプロイ&テストしてみましょう。まずはビルドしてデプロイパッケージを準備します
$sam build
パッケージの準備ができたらデプロイします
$sam deploy --guided --parameter-overrides SecretName=<シークレットマネージャのシークレット名>
デプロイできたら適当に何度かLambdaをテスト実行してからGCPのClooud Loggingを確認してみましょう。
バッチリです!GCPからAWSのログが確認できました
まとめ
LambdaのログをGCPに送信することの意義はさておきExtensionの仕組みを理解するための良い取っ掛かりになりました。紹介したサンプルリポジトリにはPython以外にも様々なランタイムのサンプルが公開されており、実際に手を動かしならがLambda Extensionsについて学ぶための良い教材となっています。まだLambda Extensionsを試したことのない方は是非ともお試し下さい。
参考
- https://github.com/aws-samples/aws-lambda-extensions
- https://docs.aws.amazon.com/lambda/latest/dg/runtimes-logs-api.html
- https://cloud.google.com/logging/docs/reference/libraries?hl=ja#logging_write_log_entry-python
- https://googleapis.dev/python/google-auth/latest/user-guide.html#service-account-private-key-files